package cn.osworks.mqtt;

import java.io.IOException;
import java.sql.Timestamp;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

public class Subscribe1 implements MqttCallback {

	public static void main(String[] args) {
		int qos = 2;
		String broker = "tcp://localhost:1883";
		String subTopic = "Sample/Java";
		try {
			Subscribe1 Subscribe1 = new Subscribe1(broker, "SampleJavaV3_Subscribe1");
			Subscribe1.subscribe(subTopic, qos);
		} catch (MqttException me) {
			// Display full details of any exception that occurs
			System.out.println("reason " + me.getReasonCode());
			System.out.println("msg " + me.getMessage());
			System.out.println("loc " + me.getLocalizedMessage());
			System.out.println("cause " + me.getCause());
			System.out.println("excep " + me);
			me.printStackTrace();
		}
	}

	// Private instance variables
	private MqttClient client;
	private String brokerUrl;
	private MqttConnectOptions conOpt;

	public Subscribe1(String brokerUrl, String clientId) throws MqttException {
		this.brokerUrl = brokerUrl;
		String tmpDir = System.getProperty("java.io.tmpdir");
		MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
		try {
			conOpt = new MqttConnectOptions();
			conOpt.setCleanSession(true);
			client = new MqttClient(this.brokerUrl, clientId, dataStore);
			client.setCallback(this);
		} catch (MqttException e) {
			e.printStackTrace();
			log("Unable to set up client: " + e.toString());
			System.exit(1);
		}
	}

	public void subscribe(String topicName, int qos) throws MqttException {

		client.connect(conOpt);
		log("Connected to " + brokerUrl + " with client ID " + client.getClientId());
		log("Subscribing to topic \"" + topicName + "\" qos " + qos);
		client.subscribe(topicName, qos);
		log("Press <Enter> to exit");
		try {
			System.in.read();
		} catch (IOException e) {
			// If we can't read we'll just exit
		}
		client.disconnect();
		log("Disconnected");
	}

	private void log(String message) {
		System.out.println(message);
	}

	public void connectionLost(Throwable cause) {
		// Called when the connection to the server has been lost.
		// An application may choose to implement reconnection
		// logic at this point. This sample simply exits.
		log("Connection to " + brokerUrl + " lost!" + cause);
		System.exit(1);
	}

	public void deliveryComplete(IMqttDeliveryToken token) {
		// Called when a message has been delivered to the
		// server. The token passed in here is the same one
		// that was passed to or returned from the original call to publish.
		// This allows applications to perform asynchronous
		// delivery without blocking until delivery completes.
		//
		// This sample demonstrates asynchronous deliver and
		// uses the token.waitForCompletion() call in the main thread which
		// blocks until the delivery has completed.
		// Additionally the deliveryComplete method will be called if
		// the callback is set on the client
		//
		// If the connection to the server breaks before delivery has completed
		// delivery of a message will complete after the client has
		// re-connected.
		// The getPendingTokens method will provide tokens for any messages
		// that are still to be delivered.
	}

	/**
	 * @see MqttCallback#messageArrived(String, MqttMessage)
	 */
	public void messageArrived(String topic, MqttMessage message) throws MqttException {
		String time = new Timestamp(System.currentTimeMillis()).toString();
		System.out.println("Time:\t" + time + "  Topic:\t" + topic + "  Message:\t" + new String(message.getPayload())
				+ "  QoS:\t" + message.getQos());
	}
}
